Note: Our models take too long to train and run full games so we are just including our functions but not calling them.
We did not include our Proximal Policy Optimization (PPO) model because it didn't work but its code in /src
from random import random, randint, sample
import os
import sys
import numpy as np
import torch
import torch.nn as nn
import cv2
from collections import deque
current_folder = os.getcwd()
tetris_folder = os.path.join(current_folder, 'src','dqn')
sys.path.append(tetris_folder)
from modified_tetris import Tetris
class DQN(nn.Module):
def __init__(self):
super(DQN, self).__init__()
self.conv1 = nn.Sequential(nn.Linear(4, 64), nn.ReLU(inplace=True))
self.conv2 = nn.Sequential(nn.Linear(64, 64), nn.ReLU(inplace=True))
self.conv3 = nn.Sequential(nn.Linear(64, 1))
self._create_weights()
def _create_weights(self):
for m in self.modules():
if isinstance(m, nn.Linear):
nn.init.xavier_uniform_(m.weight)
nn.init.constant_(m.bias, 0)
def forward(self, x):
x = self.conv1(x)
x = self.conv2(x)
x = self.conv3(x)
return x
WIDTH = 10 # Width of board
HEIGHT = 20 # Height of board
BLOCK_SIZE = 30 # Block size when rendering
BATCH_SIZE = 512 # High batch size
LEARNING_RATE = 1e-3
GAMMA = 0.99
INITIAL_EPSILON = 1.0
FINAL_EPSILON = 1e-3
NUM_DECAY_EPOCHS = 1800
NUM_EPOCHS = 3000
SAVE_INTERVAL = 50
REPLAY_MEMORY_SIZE = 28000
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
DEVICE
import matplotlib.pyplot as plt
def train():
torch.manual_seed(42)
if torch.cuda.is_available():
torch.cuda.manual_seed(42)
env = Tetris(width=WIDTH, height=HEIGHT, block_size=BLOCK_SIZE)
model = DQN().to(DEVICE)
optimizer = torch.optim.Adam(model.parameters(), lr=LEARNING_RATE)
criterion = nn.MSELoss()
state = env.reset().to(DEVICE)
replay_memory = deque(maxlen=REPLAY_MEMORY_SIZE)
epoch = 0
# Data for plotting
epoch_scores = []
while epoch < NUM_EPOCHS:
next_steps = env.get_next_states()
# Epsilon Greedy
epsilon = FINAL_EPSILON + (max(NUM_DECAY_EPOCHS - epoch, 0) *
(INITIAL_EPSILON - FINAL_EPSILON) / NUM_DECAY_EPOCHS)
next_actions, next_states = zip(*next_steps.items())
next_states = torch.stack(next_states).to(DEVICE)
model.eval()
with torch.no_grad():
predictions = model(next_states)[:, 0]
model.train()
if random() <= epsilon:
index = randint(0, len(next_steps) - 1)
else:
index = torch.argmax(predictions).item()
next_state = next_states[index, :]
action = next_actions[index]
reward, done = env.step(action, render=False)
next_state = next_state.to(DEVICE)
replay_memory.append([state, reward, next_state, done])
if done:
final_score = env.score
final_tetrominoes = env.tetrominoes
final_cleared_lines = env.cleared_lines
state = env.reset().to(DEVICE)
# Store score for plotting
epoch_scores.append(final_score)
else:
state = next_state
continue
if len(replay_memory) < REPLAY_MEMORY_SIZE / 10:
continue
epoch += 1
batch = sample(replay_memory, min(len(replay_memory), BATCH_SIZE))
state_batch, reward_batch, next_state_batch, done_batch = zip(*batch)
state_batch = torch.stack(tuple(state for state in state_batch)).to(DEVICE)
reward_batch = torch.from_numpy(np.array(reward_batch, dtype=np.float32)[:, None]).to(DEVICE)
next_state_batch = torch.stack(tuple(state for state in next_state_batch)).to(DEVICE)
q_values = model(state_batch)
model.eval()
with torch.no_grad():
next_prediction_batch = model(next_state_batch)
model.train()
# Compute the target Q-values for each transition
y_values = [
reward if done else reward + GAMMA * prediction
for reward, done, prediction in zip(reward_batch, done_batch, next_prediction_batch)
]
y_tensor = torch.tensor(y_values, dtype=torch.float32, device=DEVICE)
y_batch = y_tensor[:, None]
optimizer.zero_grad()
loss = criterion(q_values, y_batch)
loss.backward()
optimizer.step()
print("Epoch: {}/{}, Action: {}, Score: {}, Tetrominoes {}, Cleared lines: {}".format(
epoch,
NUM_EPOCHS,
action,
final_score,
final_tetrominoes,
final_cleared_lines))
if epoch > 0 and epoch % SAVE_INTERVAL == 0:
torch.save(model.state_dict(), "saved_model.pth")
torch.save(model.state_dict(), "saved_model.pth")
# Plotting the scores
plt.figure(figsize=(10, 6))
plt.plot(range(len(epoch_scores)), epoch_scores, label="Score")
plt.xlabel("Epochs")
plt.ylabel("Score")
plt.title("Score vs. Epochs")
plt.legend()
plt.grid()
plt.show()
return model
FPS = 300
def evaluate_model(model, num_games=10):
model.eval()
env = Tetris(width=10, height=20, block_size=30)
total_score = 0
total_tetrominoes = 0
total_lines_cleared = 0
for game in range(num_games):
_ = env.reset().to(DEVICE)
game_score = 0
game_tetrominoes = 0
game_lines_cleared = 0
while True:
next_steps = env.get_next_states()
next_actions, next_states = zip(*next_steps.items())
next_states = torch.stack(next_states).to(DEVICE)
with torch.no_grad():
predictions = model(next_states)[:, 0]
best_action_index = torch.argmax(predictions).item()
action = next_actions[best_action_index]
_, done = env.step(action,render=False)
game_score = env.score
game_tetrominoes = env.tetrominoes
game_lines_cleared = env.cleared_lines
if done:
break
# Accumulate totals
total_score += game_score
total_tetrominoes += game_tetrominoes
total_lines_cleared += game_lines_cleared
print(f"Game {game + 1}/{num_games} - Score: {game_score}, Tetrominoes: {game_tetrominoes}, Lines Cleared: {game_lines_cleared}")
# Calculate averages
avg_score = total_score / num_games
avg_tetrominoes = total_tetrominoes / num_games
avg_lines_cleared = total_lines_cleared / num_games
print(f"\nEvaluation Results:")
print(f"Average Score: {avg_score}")
print(f"Average Tetrominoes: {avg_tetrominoes}")
print(f"Average Lines Cleared: {avg_lines_cleared}")
return avg_score, avg_tetrominoes, avg_lines_cleared
current_folder = os.getcwd()
saved_model = os.path.join(current_folder, 'src', 'dqn', 'Trained_Models','adaptation2.pth')
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
agent_tetris = DQN().to(DEVICE)
# Run line below if using CPU
agent_tetris.load_state_dict(torch.load(saved_model, map_location=torch.device('cpu')))
# Run line below if using GPU
# agent_tetris.load_state_dict(torch.load(saved_model))
#avg_score, avg_tetrominoes, avg_lines_cleared = evaluate_model(agent_tetris)
/var/folders/sf/yp04y91j64g6csfzq6jv0vhh0000gn/T/ipykernel_41740/3215735096.py:8: FutureWarning: You are using `torch.load` with `weights_only=False` (the current default value), which uses the default pickle module implicitly. It is possible to construct malicious pickle data which will execute arbitrary code during unpickling (See https://github.com/pytorch/pytorch/blob/main/SECURITY.md#untrusted-models for more details). In a future release, the default value for `weights_only` will be flipped to `True`. This limits the functions that could be executed during unpickling. Arbitrary objects will no longer be allowed to be loaded via this mode unless they are explicitly allowlisted by the user via `torch.serialization.add_safe_globals`. We recommend you start setting `weights_only=True` for any use case where you don't have full control of the loaded file. Please open an issue on GitHub for any issues related to this experimental feature.
agent_tetris.load_state_dict(torch.load(saved_model, map_location=torch.device('cpu')))
<All keys matched successfully>
current_folder = os.getcwd()
from IPython.display import Video
video_path = os.path.join(current_folder, 'src', 'dqn', 'dqn_model.mp4')
Video(video_path, embed=True, width=600)
import sys
import os
import numpy as np
import random
import cv2
current_folder = os.getcwd()
tetris_folder = os.path.join(current_folder, 'src','genetic_algorithm')
sys.path.append(tetris_folder)
import pickle
from tetris import Tetris
class GeneticAgent:
def __init__(self, weights=None):
# Initialize weights for the features:
# lines_cleared, holes, bumpiness, height
if weights is None:
# Random weights between -1 and 1
self.weights = np.random.uniform(-1, 1, 4)
else:
self.weights = weights
def get_action(self, env):
# Get possible next states
next_states = env.get_next_states()
best_value = -float('inf')
best_action = None
for action, features in next_states.items():
# Compute value using linear combination of features and weights
value = np.dot(self.weights, features.numpy())
if value > best_value:
best_value = value
best_action = action
return best_action
class GeneticAlgorithm:
def __init__(self, population_size, mutation_rate, crossover_rate, elite_fraction):
self.population_size = population_size
self.mutation_rate = mutation_rate
self.crossover_rate = crossover_rate
self.elite_fraction = elite_fraction
self.population = [GeneticAgent() for _ in range(population_size)]
def evaluate_fitness(self, agents, render=False, grid_size=(4, 4)):
fitness_scores = []
frames = []
for idx, agent in enumerate(agents):
env = Tetris()
total_score = 0
done = False
while not done:
action = agent.get_action(env)
reward, done = env.step(action, render=False)
total_score += reward
if render:
frame = env.render(mode='rgb_array')
frames.append((idx, frame))
fitness_scores.append(total_score)
if render:
self.render_games(frames, grid_size)
return fitness_scores
def render_games(self, frames, grid_size):
# Organize frames by agent index
from collections import defaultdict
agent_frames = defaultdict(list)
for idx, frame in frames:
agent_frames[idx].append(frame)
max_frames = max(len(f) for f in agent_frames.values())
for frame_idx in range(max_frames):
grid_frames = []
for idx in range(self.population_size):
if frame_idx < len(agent_frames[idx]):
frame = agent_frames[idx][frame_idx]
else:
# If agent finished, show last frame
frame = agent_frames[idx][-1]
grid_frames.append(frame)
# Create grid image
grid_img = self.create_grid(grid_frames, grid_size)
cv2.imshow("Genetic Algorithm Tetris", grid_img)
key = cv2.waitKey(1)
if key == 27: # Esc key to stop
cv2.destroyAllWindows()
return
def create_grid(self, frames, grid_size):
rows = []
idx = 0
frame_height, frame_width, _ = frames[0].shape
for _ in range(grid_size[0]):
row_frames = []
for _ in range(grid_size[1]):
if idx < len(frames):
frame = frames[idx]
else:
# Fill with black images if not enough frames
frame = np.zeros((frame_height, frame_width, 3), dtype=np.uint8)
row_frames.append(frame)
idx += 1
row = np.hstack(row_frames)
rows.append(row)
grid_img = np.vstack(rows)
return grid_img
def select_parents(self, fitness_scores):
# Normalize fitness scores
total_fitness = sum(fitness_scores)
if total_fitness == 0:
selection_probs = [1 / len(fitness_scores)] * len(fitness_scores)
else:
selection_probs = [f / total_fitness for f in fitness_scores]
parents = np.random.choice(self.population, size=2, p=selection_probs)
return parents
def crossover(self, parent1, parent2):
if random.random() < self.crossover_rate:
# Single-point crossover
crossover_point = random.randint(1, len(parent1.weights) - 1)
child_weights = np.concatenate(
(parent1.weights[:crossover_point], parent2.weights[crossover_point:])
)
return GeneticAgent(weights=child_weights)
else:
return GeneticAgent(weights=parent1.weights.copy())
def mutate(self, agent):
for i in range(len(agent.weights)):
if random.random() < self.mutation_rate:
# Mutate weight with Gaussian noise
agent.weights[i] += np.random.normal()
return agent
def evolve(self, render=False, grid_size=(4, 4)):
# Evaluate fitness
fitness_scores = self.evaluate_fitness(self.population, render=render, grid_size=grid_size)
new_population = []
# Elitism: keep the top-performing agents
elite_size = int(self.elite_fraction * self.population_size)
elite_indices = np.argsort(fitness_scores)[-elite_size:]
elite_agents = [self.population[i] for i in elite_indices]
new_population.extend(elite_agents)
# Generate the rest of the new population
while len(new_population) < self.population_size:
parent1, parent2 = self.select_parents(fitness_scores)
child = self.crossover(parent1, parent2)
child = self.mutate(child)
new_population.append(child)
self.population = new_population
max_fitness = max(fitness_scores)
avg_fitness = sum(fitness_scores) / len(fitness_scores)
min_fitness = min(fitness_scores)
return max_fitness, avg_fitness, min_fitness
Here we are going to use the genetic algorithm above to implement a 2 generation training session using the same hyperparameters that were used for our best run. This will pop up a Genetic Algorithm Training window, rendered to show how the gnerations and population works. If you get lucky in the first or second generation, please stop the runtime of the code below or it will go for a while.
def train_snippet():
generations = 2
population_size = 4
mutation_rate = 0.1
crossover_rate = 0.7
elite_fraction = 0.2
ga = GeneticAlgorithm(population_size, mutation_rate, crossover_rate, elite_fraction)
print("Starting training snippet with rendering...")
for generation in range(generations):
print(f"Generation {generation + 1}")
ga.evolve(render=True, grid_size=(2, 2))
print("Training complete.")
train_snippet()
Starting training snippet with rendering... Generation 1
2024-12-11 21:53:52.429 python[41740:17189686] +[IMKClient subclass]: chose IMKClient_Modern 2024-12-11 21:53:52.429 python[41740:17189686] +[IMKInputSession subclass]: chose IMKInputSession_Modern
Generation 2 Training complete.
This snippet will showcases the end of a random run that we did with our best agent that was trained.
current_folder = os.getcwd()
from IPython.display import Video
video_path = os.path.join(current_folder, 'src', 'genetic_algorithm', 'Results', '30secondClip.mp4')
Video(video_path, embed=True, width=600)
from IPython.display import Image
Image(filename="src/genetic_algorithm/Results/fitness_trends.png")